package com.free.bsf.elasticsearch.impl;

import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.free.bsf.core.base.Callable;
import com.free.bsf.core.db.DbConn;
import com.free.bsf.core.util.*;
import com.free.bsf.elasticsearch.base.Param;
import lombok.val;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;

import com.alibaba.druid.pool.DruidDataSource;
import com.fasterxml.jackson.core.type.TypeReference;
import com.free.bsf.core.serialize.JsonSerializer;
import com.free.bsf.elasticsearch.ElasticSearchProperties;
import com.free.bsf.elasticsearch.base.ElasticSearchAware;
import com.free.bsf.elasticsearch.mapper.MappingBuilder;

public class ElasticSearchSqlProvider implements AutoCloseable {

	private TransportClient transportClient;
	private DruidDataSource druidDataSource;
	protected ElasticSearchProperties elasticSearchProperties;
	protected MappingBuilder mappingBuilder = new MappingBuilder();
	protected static String DATA_FORMAT="yyyy-MM-dd'T'HH:mm:ss";

	public ElasticSearchProvider getElasticSearchProvider(){
		return ContextUtils.getBean(ElasticSearchProvider.class,false);
	}

	public ElasticSearchSqlProvider(ElasticSearchProperties elasticSearchProperties, TransportClient transportClient, DruidDataSource druidDataSource){
		this.elasticSearchProperties = elasticSearchProperties;
		this.transportClient = transportClient;
		this.druidDataSource = druidDataSource;
	}

	public void createIndex(String index, String type) {
		ElasticSearchMonitor.hook().run("createIndex", () -> {
			getClient().prepareIndex(index, type).setSource().get();
		});
	}

	/**
	 * 功能描述：新建索引
	 *
	 * @param indexName 索引名
	 * @param type  类型
	 * @param documentClazz mapping规则
	 */
	public void createIndex(String indexName, String type, Class<?> documentClazz) {
		if (documentClazz != null) {
			ElasticSearchMonitor.hook().run("createIndex", () -> {
				getClient().admin().indices().prepareCreate(indexName).addMapping(type, mappingBuilder.buildMapping(documentClazz)).get();
			});
		} else {
			createIndex(indexName, type);
		}
	}

	/**
	 * 功能描述：设置mapping
	 *
	 * @param indexName 索引名
	 * @param type  类型
	 * @param documentClazz mapping规则
	 */
	public void putMapping(String indexName, String type, Class<?> documentClazz) {
		if (documentClazz != null) {
			ElasticSearchMonitor.hook().run("putMapping", () -> {
				PutMappingRequest mappingRequest = Requests.putMappingRequest(indexName).type(type).source(mappingBuilder.buildMapping(documentClazz));
				getClient().admin().indices().putMapping(mappingRequest).actionGet();
			});
		}
	}

	public <T> List<T> searchListBySql(String sql,Param param, Class<T> clazz){
		return searchListBySql(param.build(sql),clazz);
	}

	protected <T> ParameterizedType getListType(Class<T> clazz){
		val type = new ParameterizedType() {
			public String getTypeName() {
				return List.class.getTypeName();
			}

			public Type[] getActualTypeArguments() {
				return new Type[]{clazz};
			}

			public Type getRawType() {
				return List.class;
			}

			public Type getOwnerType() {
				return null;
			}
		};
		return type;
	}

	public <T> List<T> searchListBySql(String sql, Class<T> clazz) {
		return ElasticSearchMonitor.hook().run("searchBySql", () -> {
			//val type2 = objectMapper.getTypeFactory().constructParametricType(List.class,clazz);

			return pintSql(sql,()->{
				try(DbConn conn=new DbConn(this.druidDataSource)) {
					val rs = conn.executeList(sql, new Object[]{});
					List<T> list = fromJson(toJson(rs), getListType(clazz));
					return list;
			}});
		});
	}

	public <T> T searchObjectBySql(String sql,Param param, Class<T> clazz){
		return searchObjectBySql(param.build(sql),clazz);
	}

	public <T> T searchObjectBySql(String sql, Class<T> clazz) {
		return ElasticSearchMonitor.hook().run("searchBySql", () -> {
			return pintSql(sql,()->{
				try(DbConn conn=new DbConn(this.druidDataSource)){
					return ConvertUtils.tryConvert(conn.executeScalar(sql, new Object[]{}),clazz);
			}});
		});
	}

	public void deleteBySql(String sql,Param param){
		deleteBySql(sql,param);
	}

	public void deleteBySql(String sql) {
		ElasticSearchMonitor.hook().run("deleteBySql", () -> {
			pintSql(sql,()->{
				try(DbConn conn=new DbConn(this.druidDataSource)){
					try {
						conn.getConn().prepareStatement(sql).executeQuery();
					}catch (Exception e){
						if((e.getCause()!=null&&e.getCause()instanceof NullPointerException))
						{
							//底层不返回值
						}else {
							throw new ElasticsearchException(new DbConn.DbException("es deleteBySql异常",sql,e));
						}
					}
					return true;
				}
			});
		});
	}

	public <T extends ElasticSearchAware> void insertData(String index, String type, Collection<T> coll) {
		ElasticSearchMonitor.hook().run("insertData", () -> {
			int size = coll.size();
			if (size < this.elasticSearchProperties.getBulkSize()) {
				coll.forEach(item -> buildRequest(index, type, item).get());
			} else {
				bulkInsertData(index, type, coll);
			}
		});
	}

	private <T extends ElasticSearchAware> void bulkInsertData(String index, String type, Collection<T> coll) {
		BulkRequestBuilder bulkRequest = getClient().prepareBulk();
		coll.forEach((item) -> {
			bulkRequest.add(buildRequest(index, type, item));
		});
		bulkRequest.get();
	}

	protected static ObjectMapper objectMapper = JsonSerializer.createObjectMapper()
			.setDateFormat(new SimpleDateFormat(DATA_FORMAT));

	private <T extends ElasticSearchAware> IndexRequestBuilder buildRequest(String index, String type, T obj) {
		String id = obj.get_id();
		val json = toJson(obj);
		HashMap<String,Object> jsonObject = fromJson(json, new TypeReference<HashMap<String,Object>>(){}.getType());
	//JSONObject.parseObject(json);
		if (StringUtils.isEmpty(id)) {
			return getClient().prepareIndex(index, type).setSource(jsonObject);
		} else {
			return getClient().prepareIndex(index, type).setId(id).setSource(jsonObject);
		}
	}

	@Override
	public void close() {
		if(transportClient!=null)
		{try{transportClient.close();}catch (Exception e){}}
		if(druidDataSource!=null)
		{try{druidDataSource.close();}catch (Exception e){}}
	}

	private TransportClient getClient() {
		return this.transportClient;
	}

	public String formatDate(Date date){
		return DateFormatUtils.format(date,DATA_FORMAT);
	}

	protected String toJson(Object obj){
		JsonSerializer jsonSerializer = new JsonSerializer();
		jsonSerializer.setObjectMapper(objectMapper);
		String json = jsonSerializer.serialize(obj);
		return json;
	}
	protected <T>T fromJson(String json,Type type){
		JsonSerializer jsonSerializer = new JsonSerializer();
		jsonSerializer.setObjectMapper(objectMapper);
		return jsonSerializer.deserialize(json,type);
	}

	protected <T> T pintSql(String sql,Callable.Func0<T> func0){
		try{
			return func0.invoke();
		}catch (Throwable e){
			if(PropertyUtils.getPropertyCache(ElasticSearchProperties.BsfElasticSearchPrintSql,true)){
				LogUtils.info(ElasticSearchSqlProvider.class,ElasticSearchProperties.Project,sql);
			}
			throw e;
		}
	}

}
